image-2.png

Анализ логов веб-сайта.
¶

выполнил: Морозов Е.А.


1. Постановка задачи.¶

Необходимо осуществить анализ логов, создав скрипт для формирования витрины на основе логов web-сайта. Разработаный скрипт витрины данных должен иметь следующее содержание и отображать следующие данные:

  1. Суррогатный ключ устройства.
  2. Название устройства.
  3. Количество пользователей.
  4. Доля пользователей данного устройства от общего числа пользователей.
  5. Количество совершенных действий для данного устройства.
  6. Доля совершенных действий с данного устройства, относительно других устройств.
  7. Список из 5 самых популярных браузеров, используемых на данном устройстве различными пользователями, с указанием доли использования для данного браузера относительно остальных браузеров.
  8. Количество ответов сервера, отличных от 200, на данном устройстве.
  9. Для каждого из ответов сервера, отличных от 200, сформировать поле, в котором будет содержаться количество ответов данного типа.

Источник данных (логи web-сайта): https://disk.yandex.ru/d/__


2. Ход работы.¶

Выполнение разбито на стадии в рамках ETL-процесса: extract, transform и load.

  • Загрузка данных.

  • Извлечение данных.
    Состоит в обработке файла логов с помощью методов ЯП *python* для извлечения структурированной информации.

  • Преобразование данных.
    Состоит в аггрегировании и трансформации сведенных в табличную форму данных выбранными для этого инструментами/в соотв. библиотеках (Базы Данных, табличные прцессоры Pandas/Dusk/SPARK). Позволит найти ответы на вопросы задания, группируя и сортируя данные подходящим способом.

  • Выгрузка результатов.
    Сформированные в результате работы скрипта витрины данных сохраняются в подходящем для дальнейшего применения виде.

  • Выводы об эффективности процесса.


2.1. Пояснения файловой структуры проекта.¶

Код:

# организация хранения данных проекта
print("В рабочей директории создана папка 'morozov_ea' для организации хранения данных проекта.\n")
! ls -lah ..

Вывод:

В рабочей директории создана папка 'morozov_ea' для организации хранения данных проекта.

total 48K
drwxr-x---  9 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 22 18:33 .
drwxr-xr-x 31 root                   root                   4.0K Dec 29 14:51 ..
-rw-r--r--  1 jupyter-morozov_evgeny jupyter-morozov_evgeny  220 Feb 25  2020 .bash_logout
-rw-r--r--  1 jupyter-morozov_evgeny jupyter-morozov_evgeny 3.7K Feb 25  2020 .bashrc
drwxr-xr-x  5 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 22 18:34 .cache
drwxrwsr-x  3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 22 18:33 .conda
lrwxrwxrwx  1 jupyter-morozov_evgeny jupyter-morozov_evgeny   55 Dec 16 11:34 dags -> /root/data-analysis/airflow/dags/jupyter-morozov_evgeny
drwxr-xr-x  2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 16 22:19 .ipynb_checkpoints
drwxr-xr-x  3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 16 22:25 .ipython
drwxr-xr-x  3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 21 21:52 .jupyter
drwxr-xr-x  6 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Dec 22 18:34 .local
drwxr-xr-x  6 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  9 00:12 morozov_ea
-rw-r--r--  1 jupyter-morozov_evgeny jupyter-morozov_evgeny  807 Feb 25  2020 .profile

Код:

# структура проекта и файлы
print("Папка проекта содержит:\n - папку",
      "\033[1m" + "'input_data'" + "\033[0m",
      "с исходными данными задания,\n - папку",
      "\033[1m" + "'output_data'" + "\033[0m",
      "с файлами, получаемыми в ходе работы и считающимися итоговыми результатами,\n - папку",
      "\033[1m" + "'spark-warehouse'" + "\033[0m",
      ", создаваемую при работе фреймворка SPARK (подробнее в соотв. разделе исследования).\n")
!ls -lh

Вывод:

Папка проекта содержит:
 - папку 'input_data' с исходными данными задания,
 - папку 'output_data' с файлами, получаемыми в ходе работы и считающимися итоговыми результатами,
 - папку 'spark-warehouse' , создаваемую при работе фреймворка SPARK (подробнее в соотв. разделе исследования).

total 264K
drwxr-xr-x 3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  7 23:10 input_data
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 251K Jan  9 00:12 morozov_ea_final_03.ipynb
drwxr-xr-x 5 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  8 23:06 output_data
drwxr-xr-x 3 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  9 00:18 spark-warehouse

Код:

print("рабочие файлы в 'input_data':")
! ls -lh ./input_data/

Вывод:

рабочие файлы в 'input_data':
total 3.4G
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 3.3G Jan  7 23:07 access-Copy1.log
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny  64M Jan  7 23:07 access-Copy1_short.log
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny  13M Dec 17 14:05 client_hostname.csv

После пояснения структуры проекта, начнём исследование.

3. Исследование.¶

Импорты, установка компонентов для работы.¶

# импортируем библиотеки:
import datetime
from datetime import datetime
import numpy as np
import pandas as pd
from random import sample
import re
from tqdm import tqdm
# импорты модулей Spark:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import pyspark.pandas as ps

Код:

# получим объект из файла
#test_file = open("../morozov_ea/input_data/access-Copy1_short.log", "r") # обрезанная версия лога для тестов
test_file = open("../morozov_ea/input_data/access-Copy1.log", "r")

# считываем все строки
rows = test_file.readlines()

# закрываем файл
test_file.close

print(f'Количество строк в файле логов = {len(rows):_} \n'.replace("_", " "))
print('Образец данных:\n')
for row in sample(rows, 3):
    print(row)

Вывод:

Количество строк в файле логов = 10 365 152 

Образец данных:

5.127.49.29 - - [22/Jan/2019:16:19:52 +0330] "GET /image/43688/productModel/200x200 HTTP/1.1" 200 4662 "https://www.zanbil.ir/m/filter/p49?page=1" "Mozilla/5.0 (Linux; Android 5.1.1; SAMSUNG SM-J320F Build/LMY47V) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/7.4 Chrome/59.0.3071.125 Mobile Safari/537.36" "-"

185.240.149.33 - - [25/Jan/2019:21:28:06 +0330] "GET /static/images/next.png HTTP/1.1" 200 3045 "https://znbl.ir/static/bundle-bundle_site_head.css" "Mozilla/5.0 (Windows NT 6.1; rv:64.0) Gecko/20100101 Firefox/64.0" "-"

Входные данные довольно "грязные" и слабоструктурированные. Практически во всех записях присутствует информация о:

  • IP-адресе с которого происходило обращение к сайту
  • Временной метке события
  • Кодах ответа сервера
  • Записях User_Agent

Потребуется большой объём предобработки.


image.png

4. Extract.¶

Хотя файл логов занимает ощутимый объём (3.5 гБ слабоструктурироавнных записей), с помощью серверной платформы 1Т оказалось вполне возможным обработать записи за 1 проход. Разбиение на батчи и последовательная обработка по частям, к счастью, не потребовались, что позволило несколько сократить объём кода.

Извлечение признаков.¶

  • Разделим строки записей по блокам, создав для этого список значений. Из смысловых блоков последовательно извлечём нужные данные и сохраним их в подходящем для дальнейшего анализа и преобразований виде.

  • Этапы преобразований вынесены в отдельные функции.


Сокращённый код:

# создаём функцию для первичной очистки строк лога от лишних данных

def cleaning(rows):
    new_rows = [] # в ходе очистки строк лога будем наполнять новый список новыми записями
    for element in tqdm(rows, desc='Splitting the rows of data', colour='green'):
        element = re.split('- -|"-"|"', element.strip()) #split с использованием regular_expressions        
        .  .  .  .  .  .  .  .  .  .
    rows.clear()
    return new_rows #функция возвращает список строк, строки разделёны на блоки по разделителям

# по каждой строке-объекту получим список признаков, при дальнейшей обработке сократим количество элементов
# до необходимого кол-ва, а также обработаем сами значения
raw_data_all_features = cleaning(rows_sample)

Вывод:

Splitting the rows of data: 100%|██████████| 10365152/10365152 [01:22<00:00, 125357.62it/s]
Deleting empties: 100%|██████████| 10365152/10365152 [00:09<00:00, 1123350.02it/s]

Сокращённый код:

# создаём функцию для удаления малоинформативных записей и получаем очищенный список на выходе.

def cleaning_uninf(rows):
    drop_that_rows = []                         # пустой список для индексов строк годных для удаления
    for i in tqdm(range(len(rows)), desc='Looking data for uninformative rows', colour='green'):
        if len(rows[i]) < 5:   # принятый критерий отсеивания строк
            drop_that_rows.append(i)
        .  .  .  .  .  .  .  .  .  . 
    return rows_without_dropped

raw_data_all_features = cleaning_uninf(raw_data_all_features)

Вывод:

Looking data for uninformative rows: 100%|██████████| 10365152/10365152 [00:02<00:00, 4140638.28it/s]
Deleting uninformative rows: 100%|██████████| 10365152/10365152 [16:05<00:00, 10736.63it/s]

Малоинформативными являются 14353 строки.
В полученном списке 10350799 строк

Код:

# получим сокращённый список значений (4 признака вместо 6) для парсинга в более понятном виде 

raw_data_4f = []
for k in tqdm(range(len(raw_data_all_features))
              , desc='Creating a list with working data (4 columns istead of 6)'
              , colour='green'):
    raw_data_4f.append([
                        raw_data_all_features[k][0],
                        raw_data_all_features[k][1],
                        raw_data_all_features[k][3],
                        raw_data_all_features[k][-1]
                       ])

raw_data_all_features.clear() # очищаем список raw_data_all_features

Вывод:

Creating a list with working data (4 columns istead of 6): 100%|██████████| 10350799/10350799 [00:21<00:00, 480765.46it/s]

Сокращённый код:

# функция для обработки значений столбца с датами

def date_transformer(dates):
    dates_list = []
    for date in tqdm(dates                            # с пом. reg.expr отделим даты от текста по шаблону
                     , desc='dates extraction'
                     , colour='green'):          
        dates_list.append(re.findall("[^'[][\w\/:]*[^\+0330\]']", date[1]))
       .  .  .  .  .  .  .  .  .  .
    return dates_list                                 # функция возвращает список с преобразованными датами

# получаем список с преобразованными датами
dates_list = date_transformer(raw_data_4f)

Вывод:

dates extraction: 100%|██████████| 10350799/10350799 [00:22<00:00, 464198.05it/s]
converting dates to datetime-format: 100%|██████████| 10350799/10350799 [01:36<00:00, 106926.65it/s]

Сокращённый код:

# функция для обработки значений столбца кодами состояния HTTP

def serv_answers(answers):
    serv_answers_list = []
    for answer in tqdm(answers
       .  .  .  .  .  .  .  .  .  .
    return serv_answers_list                           # функция возвращает список с кодами ответов сервера

# получаем список с преобразованными кодами состояния HTTP
serv_answers_list = serv_answers(raw_data_4f)

Вывод:

server's answers converting: 100%|██████████| 10350799/10350799 [00:14<00:00, 702469.67it/s]

  • Наибольшая часть работы по извлечению данных приходится на записи User_Agent.

Сокращённый код:

# функция для предв. обработки значений столбца User_Agent

def dev_browsers(lines):
    dev_browsers_list = []
    for line in tqdm(lines
       .  .  .  .  .  .  .  .  .  .
        dev_browsers_list.append([*re.findall("\(([\w\s.;:\-\/+@]+)\)", line[-1]),
                                  *re.findall("\w\) {1}([\w\/\s.]+$)", line[-1])])

    return dev_browsers_list                      # функция возвращает список с соотв. значениями User_Agent

 # получаем список с предварительно разделёнными на 2 групппы значениями User_Agent
 # пригодный для выделения искомых данных об устройстве, ОС, браузере
dev_browsers_list = dev_browsers(raw_data_4f)

Вывод:

User_Agent info splitting into device+browser info: 100%|██████████| 10350799/10350799 [00:48<00:00, 214572.21it/s]

Сокращённый код:

# функция обработки данных из User_Agent для выделения инф. об устройстве и ОС

def os_dev(raw_list):
    dev_browsers_list_os = []                       # подготовим пустой список для сбора в него записей    
    for el in tqdm(raw_list, desc='creating a list of devices + OS', colour='green'):
        if el != []:
       .  .  .  .  .  .  .  .  .  .
    return os_dev_list_changed                     # функция возвращает список с инф. об устройстве и ОС


# функции обработки данных из User_Agent для выделения инф. о браузере

def browsers(raw_list):
    dev_browsers_list_br = []                       # подготовим пустой список для сбора в него записей  
    for el in tqdm(dev_browsers_list, desc='creating a list of browsers', colour='green'):
        if len(el) == 2:
       .  .  .  .  .  .  .  .  .  .
    return browsers_list_changed                     # функция возвращает список с инф. о браузерах


# получим списки с информацией из User_Agent в отдельные списки заносим данные об устройствах+ОС и о браузерах
os_dev_list = os_dev(dev_browsers_list)
print('Длина списка os_dev_list (ОС+устройство) =', len(os_dev_list), 'строк.')

browsers_list = browsers(dev_browsers_list)
print('Длина списка browsers_list (используемый браузер) =', len(browsers_list), 'строк.')


# функции для отдельной сведения информации из списков с данными об устройствах/ОС/браузерах в единый список

def dev_os_br_union(os_dev_list, dev_browsers_list):
    united_list = []                                # подготовим пустой список для сбора записей

    # проходим в цикле по всем элементам списков, созданных выше (os_dev_list, browsers_list)
    # и заносим значия в итоговый чтобы 1 устройству соответствовал 1 браузер исходя из простой логики
    for i in tqdm(range(len(os_dev_list)), desc='creating of united device/OS/browser list', colour='green'):

        # ветвления и логика:
        if len(dev_browsers_list[i]) > 1:
       .  .  .  .  .  .  .  .  .  .            
    return united_list                              # функция возвращает список с общей информацией

# получим список с информацией об устройствах+ОС и о браузерах в одной записи
united_usag_list = dev_os_br_union(os_dev_list, browsers_list)

Вывод:

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.48 µs
creating a list of devices + OS: 100%|██████████| 10350799/10350799 [00:02<00:00, 3731730.27it/s]
parsing of raw devices_&_OS list: 100%|██████████| 10350799/10350799 [00:18<00:00, 555351.15it/s] 
secondary parsing of devices_&_OS list: 100%|██████████| 10350799/10350799 [00:06<00:00, 1700363.94it/s]
Длина списка os_dev_list (ОС+устройство) = 10350799 строк.
creating a list of browsers: 100%|██████████| 10350799/10350799 [00:02<00:00, 3545831.72it/s]
parsing of raw browsers list: 100%|██████████| 10350799/10350799 [00:34<00:00, 303134.47it/s]
secondary parsing of browsers list: 100%|██████████| 10350799/10350799 [00:08<00:00, 1180266.43it/s]
Длина списка browsers_list (используемый браузер) = 10350799 строк.
creating of united device/OS/browser list: 100%|██████████| 10350799/10350799 [00:17<00:00, 585242.45it/s]

Сведение полученных признаков.¶

Предварительная обработка закончена. Далее подготовим и сведём всю иформацию в датафрейм Pandas.

  1. обработаем данные непосредственно методами самой библиотеки Pandas,

  2. транслируем методы Pandas во фреймворк Spark,

  3. Spark предлагает лаконичную форму работы как с Hive QL (т.е. БД будет возможно хранить в распределённом виде в хранилище Hive), так и простоту подключения к БД PostgreSQL, что должно сократить путь до выгрузки итоговой витрины.

In [16]:
# интересующие нас значения хранятся в списках, составим из них словарь:

# список с IP-адресами предполагается использовать в качестве суррогатного ключа,
# сделаем его значения более похожими на ключи - преобразуем адрес и одно число:
ip_addr_list = [('').join(el[0].split('.')).strip() for el in tqdm(raw_data_4f
                                                                   , desc = 'collecting from ip_addr_list'
                                                                   , colour = 'YELLOW')]

# некоторая обработка потребуется и для значений списков, хранящих по несколько элементов
# в строках в виде вложенных списков, получим из них отдельные столбцы значений:
date = [el[0] for el in tqdm(dates_list
                             , desc = 'collecting from date_list'
                             , colour = 'YELLOW')]
       .  .  .  .  .  .  .  .  .  .
                           , colour = 'YELLOW')]
browser = [el[-1] for el in tqdm(united_usag_list
                                 , desc = 'collecting from browser_list'
                                 , colour = 'YELLOW')]

# сводим всё в словарь для создания датафрейма Pandas
data_dict = {
    'addr' : ip_addr_list,
    'date' : date,
    'serv_answer_1' : serv_answer_1,
    'serv_answer_2' : serv_answer_2,
    'device' : device,
    'os' : os,
    'browser' : browser
}

# создаём dataframe
df = pd.DataFrame(data = data_dict)

# исправим форматы данных на числовые для колонок 'addr', 'serv_answer_1', 'serv_answer_2'
for i in tqdm([0, 2, 3], desc = 'converting data formats', colour = 'green'):
    df[df.columns[i]] = pd.to_numeric(df[df.columns[i]], errors='raise', downcast=None)
    #df[df.columns[i]] = df[df.columns[i]].astype('int') - либо так

# выведем информацию о таблице и первые 5 строк полученного датафрейма:
print('Описание данных и первые 5 строк полученного датафрейма:\n')
df.info()
df.head()
collecting from ip_addr_list: 100%|██████████| 10350799/10350799 [00:04<00:00, 2207416.30it/s]
collecting from date_list: 100%|██████████| 10350799/10350799 [00:02<00:00, 5087080.65it/s]
collecting from serv_answer_1_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5187492.88it/s]
collecting from serv_answer_2_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5322285.32it/s]
collecting from device_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5620854.07it/s]
collecting from os_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5985918.96it/s]
collecting from browser_list: 100%|██████████| 10350799/10350799 [00:01<00:00, 5276965.55it/s]
converting data formats: 100%|██████████| 3/3 [00:17<00:00,  5.75s/it]
Описание данных и первые 5 строк полученного датафрейма:

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10350799 entries, 0 to 10350798
Data columns (total 7 columns):
 #   Column         Dtype         
---  ------         -----         
 0   addr           int64         
 1   date           datetime64[ns]
 2   serv_answer_1  int64         
 3   serv_answer_2  int64         
 4   device         object        
 5   os             object        
 6   browser        object        
dtypes: datetime64[ns](1), int64(3), object(3)
memory usage: 552.8+ MB

Out[16]:
addr date serv_answer_1 serv_answer_2 device os browser
0 543614941 2019-01-22 03:56:14 200 30577 parse_bot parse_bot parse_bot
1 31569651 2019-01-22 03:56:16 200 5667 ale-l21 android Chrome
2 31569651 2019-01-22 03:56:16 200 5379 ale-l21 android Chrome
3 4077167129 2019-01-22 03:56:17 200 1696 parse_bot parse_bot parse_bot
4 91997215 2019-01-22 03:56:17 200 41483 x64 windows parse_bot
In [19]:
# получим индекс по уникальным сочетаниям полей 'device' и 'os' для чего сгруппируем
# данные и сбросим индекс, чтобы получить столбец с ключами из значений индекса pd.Series 

index = (
    df
    .groupby(by = ['device', 'os'])['addr']
    .count()
    .sort_index(level = ['device', 'os'])
    .reset_index()
    .reset_index()
    .drop(columns = 'addr')
)

index.tail(5)
Out[19]:
index device os
3376 3376 zte grand s ii lte android
3377 3377 zuk z1 android
3378 3378 zuk z2121 android
3379 3379 zuk z2131 android
3380 3380 zuk z2151 android
In [20]:
# объединим таблицы df и index по полям 'device' и 'os' чтобы добавить поле с ключами к df
# удалять поле 'addr' не станем, т.к. оно позволит выделять разных пользователей устройств

df = (
    df
    .merge(index
           , how = 'left'
           , suffixes = ('', '_index')
           , left_on = ['device', 'os']
           , right_on = ['device', 'os'])
)

# переименуем столбцы 'index' в 'dev_key' и 'addr' в 'user_id'
df.rename(columns = {'index':'dev_key', 'addr':'user_id'}, inplace=True)
print('Таблица приняла следующий вид:')
df.head(3)
Таблица приняла следующий вид:
Out[20]:
user_id date serv_answer_1 serv_answer_2 device os browser dev_key
0 543614941 2019-01-22 03:56:14 200 30577 parse_bot parse_bot parse_bot 1882
1 31569651 2019-01-22 03:56:16 200 5667 ale-l21 android Chrome 76
2 31569651 2019-01-22 03:56:16 200 5379 ale-l21 android Chrome 76

Сохранение датафрейма.¶

Выполним сохранение датафрейма в формате Parquet в ходе дальнейшей работы обращаться можно будет как к полученному файлу, так и к переменной *df*, хранящей pandas.DataFrame.

In [21]:
%time

# сохраним датафрейм в формате parque и откроем его 
# (для проверки свойств полученного файла: скорость загрузки, форматы данных, схема, занимаемый объём памяти)

df.to_parquet(path='../morozov_ea/output_data/parsed_logs_df.parquet', index=None)

df_parquet = pd.read_parquet('../morozov_ea/output_data/parsed_logs_df.parquet')

print(df_parquet.info())
df_parquet.head(3)
CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.77 µs
<class 'pandas.core.frame.DataFrame'>
Int64Index: 10350799 entries, 0 to 10350798
Data columns (total 8 columns):
 #   Column         Dtype         
---  ------         -----         
 0   user_id        int64         
 1   date           datetime64[ns]
 2   serv_answer_1  int64         
 3   serv_answer_2  int64         
 4   device         object        
 5   os             object        
 6   browser        object        
 7   dev_key        int64         
dtypes: datetime64[ns](1), int64(4), object(3)
memory usage: 710.7+ MB
None
Out[21]:
user_id date serv_answer_1 serv_answer_2 device os browser dev_key
0 543614941 2019-01-22 03:56:14 200 30577 parse_bot parse_bot parse_bot 1882
1 31569651 2019-01-22 03:56:16 200 5667 ale-l21 android Chrome 76
2 31569651 2019-01-22 03:56:16 200 5379 ale-l21 android Chrome 76

image.png

5. Transform.¶

Ответы на вопросы задания получим в несколько этапов: как с использованием кластерных вычислений так и в локальном режиме.

5.1. Решение в Pandas.¶

Для ориентира получим ответы на вопросы задания с использованием библиотеки Pandas, затем будем использовать их для сравнения с результатами во фреймворке Spark.


пункты 1-4¶

  • Выведем сперва:

    1. Суррогатный ключ устройства.
    2. Название устройства.
    3. Количество пользователей.
    4. Доля пользователей данного устройства от общего числа пользователей.

In [23]:
# кол-во строк в группе, т.е. кол-во запросов с разных устройств с разным user_id
unique_users_qty = len(df.groupby(['dev_key', 'user_id']))

# датафрейм по п.1-4
df_1_4 = (
    df
    .groupby(['dev_key', 'device', 'user_id'])
    .any()['serv_answer_2']
    .reset_index()
    .groupby('dev_key')
    .agg({'device':'first', 'user_id':'count'})
    .rename(columns={'user_id':'users_qty'})
)

df_1_4['users_ratio'] = np.round((df_1_4['users_qty'] / unique_users_qty), 5)
df_1_4
Out[23]:
device users_qty users_ratio
dev_key
0 2pq93 1 0.00000
1 2pyb2 1 0.00000
2 4035d 2 0.00001
3 5047d 6 0.00002
4 5049w 1 0.00000
... ... ... ...
3376 zte grand s ii lte 4 0.00001
3377 zuk z1 4 0.00001
3378 zuk z2121 15 0.00005
3379 zuk z2131 9 0.00003
3380 zuk z2151 1 0.00000

3381 rows × 3 columns


пункты 5-6¶

  • Сгруппируем данные и дополним витрину информацией о (cуррогатный ключ и название устройства также остаются актуальными):

    5. Количество совершенных действий для данного устройства.
    6. Доля совершенных действий с данного устройства, относительно других устройств.

In [ ]:
# кол-во строк в группе, т.е. кол-во ответов сервера с различными 'serv_answer_1' и 'serv_answer_2'
unique_answers_qty = len(df.groupby(['dev_key', 'serv_answer_1', 'serv_answer_2']))

# датафрейм по п.5-6
df_5_6 = (
    df
    .groupby(['dev_key', 'device', 'serv_answer_1', 'serv_answer_2'])
    .any()['user_id']
    .reset_index()
    .groupby('dev_key')
    .agg({'device':'first', 'serv_answer_2':'count'})
    .rename(columns={'serv_answer_2':'answers_qty'})
)

df_5_6['answers_ratio'] = np.round((df_5_6['answers_qty'] / unique_answers_qty), 6)
df_5_6

image.png


пункт 7 ¶

  • Ещё раз сгруппируем данные и дополним витрину (cуррогатный ключ и название устройства также остаются актуальными) данными о браузерах:

    7. Список из 5 самых популярных браузеров, используемых на данном устройстве различными пользователями, с указанием доли использования для данного браузера относительно остальных браузеров.

In [ ]:
# датафрейм по п.7
df_7 = (
    pd.DataFrame(
        df
        .groupby(['dev_key', 'device', 'browser'])
        .count()['date']
        )
)

# дополнения (временные таблицы) для слияния с df_7 и получения расчётных величин
df_7_br_tot = df_7.copy().reset_index().groupby('browser').agg({'date':'sum'})            #групп-ка по браузеру    
df_7_br_tot['ratio_total'] = np.round(df_7_br_tot['date'] / df_7_br_tot['date'].sum(), 5) #расчёт доли от общего

df_7_br_by_dev = df_7.copy().reset_index().groupby('dev_key').sum()                       #групп-ка по устр-ву
                                                                                          #расчёт кол-ва прим-ий
# объединения таблиц
df_7 = df_7.merge(df_7_br_by_dev, how='left', left_index=True, right_index=True)
df_7['ratio_by_device'] = np.round(df_7['date_x'] / df_7['date_y'], 3)
df_7 = df_7.merge(df_7_br_tot, how='left', left_index=True, right_index=True)

# удаление лишних столбцов, переименование
df_7.drop(columns = ['date_y', 'date'], inplace = True)
df_7.rename(columns = {'date_x' : 'usage_qty'}, inplace = True)
df_7.head()

image.png


пункты 8-9 ¶

  • Дополним витрину (cуррогатный ключ и название устройства также остаются актуальными) данными об ответах сервера:

    8. Количество ответов сервера, отличных от 200, на данном устройстве.
    9. Для каждого из ответов сервера, отличных от 200, сформировать поле, в котором будет содержаться количество ответов данного типа.

In [27]:
# датафрейм для п.8-9 без строк с отвтеом 200
df_8_9 = (
    pd.DataFrame(
        df
        .loc[df.loc[:, 'serv_answer_1'] != 200, :]
        .groupby(['dev_key', 'device', 'serv_answer_1'])
        .count()['serv_answer_2']
    ).rename(columns = {'serv_answer_2' : 'not200_answers_qty'})
)

# добавляем столбец с суммарным кол-вом ответов, отличных от 200 + transform для распространения на все строки    
df_8_9['not200_sum_by_device'] = df_8_9.groupby(level = 'dev_key').transform('sum')

# меняем порядок следования столбцов
df_8_9 = df_8_9.reindex(columns = ['not200_sum_by_device', 'not200_answers_qty'])
df_8_9
Out[27]:
not200_sum_by_device not200_answers_qty
dev_key device serv_answer_1
1 2pyb2 302 2 2
2 4035d 302 8 8
3 5047d 302 2 2
4 5049w 301 2 1
302 2 1
... ... ... ... ...
3377 zuk z1 404 11 3
499 11 1
3378 zuk z2121 302 4 4
3379 zuk z2131 302 7 5
499 7 2

4794 rows × 2 columns

В целях унификации можно объединить ответы п.1-6, но датафреймы п.7 и п.8-9 получились не столь подобными, их имеет смысл рассматривать отдельно.

In [28]:
# витрина по пунктам 1-6 задания
df_1_6 = (
    df_1_4
    .merge(df_5_6
           , how='left'
           , left_index = True
           , right_index = True
           , suffixes = ('','_right'))
    .drop(columns = ['device_right'])
)

df_1_6.head()
Out[28]:
device users_qty users_ratio answers_qty answers_ratio
dev_key
0 2pq93 1 0.00000 1 0.000001
1 2pyb2 1 0.00000 139 0.000108
2 4035d 2 0.00001 26 0.000020
3 5047d 6 0.00002 83 0.000064
4 5049w 1 0.00000 137 0.000106
In [29]:
# витрина по пункту 7 задания
df_7.head()
Out[29]:
usage_qty ratio_by_device ratio_total
dev_key device browser
0 2pq93 Chrome 1 1.000 0.48032
1 2pyb2 Chrome 160 0.976 0.48032
parse_bot 4 0.024 0.15273
2 4035d Chrome 39 1.000 0.48032
3 5047d Chrome 84 0.840 0.48032
In [30]:
# витрина по пунктам 8-9 задания
df_8_9.head()
Out[30]:
not200_sum_by_device not200_answers_qty
dev_key device serv_answer_1
1 2pyb2 302 2 2
2 4035d 302 8 8
3 5047d 302 2 2
4 5049w 301 2 1
302 2 1

5.2. Решение в Spark.¶

Для курса по инженерии данных представляется логичным воспользоваться иными инструментами data engeneer: в первую очередь инструментами стэка для работы с параллельными вычислениями и распределёнными данными, - и перенести решение на эту платформу.

Первейшим из таких инструментов выглядит использование фреймворка Apache Spark.

In [31]:
# создаём Spark-сессию 
# (часто при первом запуске вылетал с ошибкой, вдобавок хотелось думать, что можно запуститься на кластере
# поэтому методом тыка была отработана техника для входа сначала c master("local"), а потом c "yarn")

for mode in ['local', 'yarn']: 
    print('mode --- ', mode)
    spark = (
        SparkSession
        .builder
        .master(mode)
        .appName("spark_pandas")
        .getOrCreate()
    )

# проверка работоспособности
spark.sql('select "spark" as hello').show()
mode ---  local
23/01/09 00:44:43 WARN Utils: Your hostname, data-analysis resolves to a loopback address: 127.0.1.1; using 146.120.224.166 instead (on interface ens160)
23/01/09 00:44:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/09 00:44:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
mode ---  yarn
23/01/09 00:44:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
+-----+
|hello|
+-----+
|spark|
+-----+

In [33]:
df = (
    spark
    .read
    .format('parquet')
    #.option("'path', '../... или file://....') # путь можно указать в .option()
    .load('../morozov_ea/output_data/parsed_logs_df.parquet')
)

# выведем схему датафрейма и посмотрим тип объекта
df.printSchema()
type(df)
root
 |-- user_id: long (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- serv_answer_1: long (nullable = true)
 |-- serv_answer_2: long (nullable = true)
 |-- device: string (nullable = true)
 |-- os: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- dev_key: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)

Out[33]:
pyspark.sql.dataframe.DataFrame
In [34]:
# конвертируем pyspark.sql.dataframe.DataFrame в датафрейм pandas-API и изменим название индекса
pds_df = df.pandas_api(index_col='__index_level_0__')
pds_df.index.rename('index', inplace= True)
display(pds_df.head(3))
type(pds_df)
/opt/tljh/user/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:248: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead
  series = series.astype(t, copy=False)
user_id date serv_answer_1 serv_answer_2 device os browser dev_key
index
0 543614941 2019-01-22 03:56:14 200 30577 parse_bot parse_bot parse_bot 1882
1 31569651 2019-01-22 03:56:16 200 5667 ale-l21 android Chrome 76
2 31569651 2019-01-22 03:56:16 200 5379 ale-l21 android Chrome 76
Out[34]:
pyspark.pandas.frame.DataFrame

Помимо решения непосредственно методами API SPARK SQL сравнивалось решение на Pandas-API. Ниже это показано только на примере 1 задания, более развёрнутое сравнение приводится в работе непосредственно.


Задания 1-4.¶

--- Pandas API ---¶

  • Выведем:

    1. Суррогатный ключ устройства.
    2. Название устройства.
    3. Количество пользователей.
    4. Доля пользователей данного устройства от общего числа пользователей.

In [36]:
# кол-во запросов с разных устройств с разным user_id
unique_users_qty = pds_df.groupby(['dev_key', 'user_id']).count().count()[0]

# датафрейм по п.1-4
pds_df_1_4 = (
    pds_df
    .groupby(['dev_key', 'device', 'user_id'])
    .any()['serv_answer_2']
    .reset_index()
    .groupby('dev_key')
    .agg({'device':'first', 'user_id':'count'})
    .rename(columns={'user_id':'users_qty'})
)

pds_df_1_4['users_ratio_1'] = unique_users_qty
pds_df_1_4['users_ratio'] = pds_df_1_4['users_qty'] / pds_df_1_4['users_ratio_1']
pds_df_1_4 = pds_df_1_4.drop(columns = 'users_ratio_1')
pds_df_1_4.head(20)
/opt/tljh/user/lib/python3.9/site-packages/pyspark/pandas/internal.py:1573: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  fields = [
/opt/tljh/user/lib/python3.9/site-packages/pyspark/sql/pandas/conversion.py:486: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
  for column, series in pdf.iteritems():
                                                                                
Out[36]:
device users_qty users_ratio
dev_key
0 2pq93 1 0.000004
1 2pyb2 1 0.000004
2 4035d 2 0.000007
3 5047d 6 0.000021
4 5049w 1 0.000004
5 5050 4 0.000014
6 5050x 1 0.000004
7 5095k 1 0.000004
8 6043d 4 0.000014
9 6p 16 0.000057
10 702so 1 0.000004
11 706 3 0.000011
12 708g 1 0.000004
13 709a 1 0.000004
14 7_plus 2 0.000007
15 8030y 1 0.000004
16 8_plus 2 0.000007
17 9002x 2 0.000007
18 9003x 3 0.000011
19 9007a 6 0.000021

С некоторыми корректировками, вызванными неполным переносом всех методов в API Pandas из родной библиотеки, задача принципиально была решена в том же виде, что и в нативном pandas с повторением всех процедур. Тот же результат получается с использованием spark.sql методов (ниже).

--- PySpark ---¶

In [37]:
# в переменной сохраним число записей в сгрупированном объекте 
unique_users_qty_s = df.groupBy('dev_key', 'device', 'user_id').count().count()

# датафрейм по п.1-4
s_df_1_4 = (
    df
    .groupBy('dev_key', 'device', 'user_id')
    .count()
    .select('dev_key', 'device', 'user_id')
    .groupBy('dev_key', 'device')
    .agg({'user_id' : 'count'})
    .toDF('dev_key', 'device', 'users_qty')
    .select('dev_key', 'device', 'users_qty')
    .withColumn('users_ratio', f.round(f.col('users_qty') / unique_users_qty_s, 5))
    .orderBy(f.asc('dev_key'))
)

s_df_1_4.show(20, truncate=False)
[Stage 22:>                                                         (0 + 1) / 1]
+-------+------+---------+-----------+
|dev_key|device|users_qty|users_ratio|
+-------+------+---------+-----------+
|0      |2pq93 |1        |0.0        |
|1      |2pyb2 |1        |0.0        |
|2      |4035d |2        |1.0E-5     |
|3      |5047d |6        |2.0E-5     |
|4      |5049w |1        |0.0        |
|5      |5050  |4        |1.0E-5     |
|6      |5050x |1        |0.0        |
|7      |5095k |1        |0.0        |
|8      |6043d |4        |1.0E-5     |
|9      |6p    |16       |6.0E-5     |
|10     |702so |1        |0.0        |
|11     |706   |3        |1.0E-5     |
|12     |708g  |1        |0.0        |
|13     |709a  |1        |0.0        |
|14     |7_plus|2        |1.0E-5     |
|15     |8030y |1        |0.0        |
|16     |8_plus|2        |1.0E-5     |
|17     |9002x |2        |1.0E-5     |
|18     |9003x |3        |1.0E-5     |
|19     |9007a |6        |2.0E-5     |
+-------+------+---------+-----------+
only showing top 20 rows

                                                                                

Стоит отметить, что пользователь, запустив Spark, получает "из коробки" возможность работы на API Pandas, SQL а также pySpark, что по широте возможностей, кажется, делает инструмент мощным и довольно безальтернативным.


Задания 5-6.¶

--- Pandas API ---¶

  • Сгруппируем данные и дополним витрину информацией о:

    5. Количество совершенных действий для данного устройства.
    6. Доля совершенных действий с данного устройства, относительно других устройств.

.........

--- PySpark ---¶

In [41]:
# в переменной сохраним число записей в сгрупированном объекте 
unique_answers_qty_s = df.groupBy('dev_key', 'serv_answer_1', 'serv_answer_2').count().count()

# датафрейм по п.5-6
s_df_5_6 = (
    df
    .groupBy('dev_key', 'device', 'serv_answer_1', 'serv_answer_2')
    .count()
    .select('dev_key', 'device', 'serv_answer_1', 'serv_answer_2')
    #.orderBy(f.asc('dev_key'), f.asc('serv_answer_1'), f.asc('serv_answer_2'))
    .groupBy('dev_key', 'device')
    .agg({'serv_answer_2' : 'count'})
    .toDF('dev_key', 'device', 'actions_qty')
    .withColumn('actions_ratio_by_device', f.round(f.col('actions_qty') / unique_answers_qty_s, 10))
    .orderBy(f.asc('dev_key'))
)

s_df_5_6.show(10, truncate=False)
[Stage 95:>                                                         (0 + 1) / 1]
+-------+------+-----------+-----------------------+
|dev_key|device|actions_qty|actions_ratio_by_device|
+-------+------+-----------+-----------------------+
|0      |2pq93 |1          |7.771E-7               |
|1      |2pyb2 |139        |1.08016E-4             |
|2      |4035d |26         |2.02044E-5             |
|3      |5047d |83         |6.44988E-5             |
|4      |5049w |137        |1.064618E-4            |
|5      |5050  |43         |3.3415E-5              |
|6      |5050x |3          |2.3313E-6              |
|7      |5095k |18         |1.39877E-5             |
|8      |6043d |58         |4.50714E-5             |
|9      |6p    |145        |1.126786E-4            |
+-------+------+-----------+-----------------------+
only showing top 10 rows

                                                                                

Решение получилось идентичным с результатом работы на Pandas. Продолжим выполнение оставшихся заданий с использованием Spark.


Задание 7. ¶

--- Pandas API ---¶

  • Ещё раз сгруппируем данные и дополним витрину данными о браузерах:

    7. Список из 5 самых популярных браузеров, используемых на данном устройстве различными пользователями, с указанием доли использования для данного браузера относительно остальных браузеров.

.........

--- PySpark ---¶

In [43]:
# датафрейм по п.7

#  первичная группировка и преобразование ps.Series к ps.DataFrame (для удобства)
s_df_7 = (
    df
    .groupby(['dev_key', 'device', 'browser'])
    .count()
    .orderBy(f.col('dev_key').asc())
)

# дополнения (временные таблицы) для слияния с df_7 и получения расчётных величин
s_df_7_br_by_dev = (                                               # группир-ка по устр-ву
    s_df_7                                                         # расчёт кол-ва применений
    .groupby(['dev_key','device'])
    .sum('count')
    .toDF('dev_key', 'device', 'sum')
    .orderBy('dev_key')
)

# объединение таблиц
s_df_7 = (
    s_df_7
    .join(s_df_7_br_by_dev, on=['dev_key', 'device'], how='left')
    .orderBy(f.col('dev_key').asc())
)

# создание новой колонки со значениями
s_df_7 = s_df_7.withColumn('ratio_by_device', f.col('count')/f.col('sum'))

s_df_7_br_tot = (                                      # группир-ка по браузеру
    df                                                 # расчёт доли от общего
    .groupby(['browser'])
    .count()
)

# добавление колонки с долей от общего кол-ва
tot_count = s_df_7_br_tot.select(f.sum('count')).collect()

s_df_7_br_tot = (
    s_df_7_br_tot
    .withColumn('ratio_total', f.col('count')/tot_count[0][0])
    .select(['browser', 'ratio_total'])
)

# объединение таблиц
s_df_7 = (
    s_df_7
    .join(s_df_7_br_tot, on=['browser'], how='left')
    .select(['dev_key', 'device', 'browser', 'count', 'ratio_by_device', 'ratio_total'])
    .orderBy(f.col('dev_key').asc(), f.col('count').desc())
)

s_df_7.show(10)
                                                                                
+-------+------+---------+-----+--------------------+-------------------+
|dev_key|device|  browser|count|     ratio_by_device|        ratio_total|
+-------+------+---------+-----+--------------------+-------------------+
|      0| 2pq93|   Chrome|    1|                 1.0|0.48032311322053495|
|      1| 2pyb2|   Chrome|  160|   0.975609756097561|0.48032311322053495|
|      1| 2pyb2|parse_bot|    4|0.024390243902439025|0.15272705034654813|
|      2| 4035d|   Chrome|   39|                 1.0|0.48032311322053495|
|      3| 5047d|   Chrome|   84|                0.84|0.48032311322053495|
|      3| 5047d|parse_bot|   16|                0.16|0.15272705034654813|
|      4| 5049w|   Chrome|  149|                 1.0|0.48032311322053495|
|      5|  5050|   Chrome|   40|  0.9302325581395349|0.48032311322053495|
|      5|  5050|parse_bot|    3| 0.06976744186046512|0.15272705034654813|
|      6| 5050x|parse_bot|    3|                 1.0|0.15272705034654813|
+-------+------+---------+-----+--------------------+-------------------+
only showing top 10 rows


Задания 8-9.¶

--- Pandas API ---¶

  • Дополним витрину данными об ответах сервера:

    8. Количество ответов сервера, отличных от 200, на данном устройстве.
    9. Для каждого из ответов сервера, отличных от 200, сформировать поле, в котором будет содержаться количество ответов данного типа.

In [44]:
# датафрейм для п.8-9 без строк с отвтеом 200

pds_df_8_9 = (
    ps.DataFrame(
        pds_df
        .loc[pds_df.loc[:, 'serv_answer_1'] != 200, :]
        .groupby(['dev_key', 'device', 'serv_answer_1'])
        .count()['serv_answer_2']
    ).rename(columns = {'serv_answer_2' : 'not200_answers_qty'})
     .sort_index()
)

# добавляем столбец с суммарным кол-вом ответов, отличных от 200
# метод .transform('sum') не поддерживает тот же функционал, что в pandas, надёжнее было объединить с пом. join
sum_by_device = (
    pds_df_8_9
    .reset_index()
    .groupby('dev_key')
    .sum()
    .sort_index()
)

pds_df_8_9 = (
    pds_df_8_9
    .reset_index()
    .set_index('dev_key')
    .merge(sum_by_device, how='left', left_index=True, right_index=True)
)

# меняем порядок следования столбцов, удаляем повторяющиеся, переименовываем
pds_df_8_9 = pds_df_8_9.drop(['serv_answer_1_y'], axis = 1)
pds_df_8_9 = (
    pds_df_8_9
    .rename(columns = {'serv_answer_1_x':'serv_answer_1', 'not200_answers_qty_y':'not200_sum_by_device'
                       , 'not200_answers_qty_x':'not200_answers_qty'})
    .reset_index()
    .set_index(['dev_key', 'device', 'serv_answer_1'])
    .reindex(columns=['not200_sum_by_device', 'not200_answers_qty'])
)

pds_df_8_9.head(10)
                                                                                
Out[44]:
not200_sum_by_device not200_answers_qty
dev_key device serv_answer_1
1 2pyb2 302 2 2
2 4035d 302 8 8
3 5047d 302 2 2
4 5049w 301 2 1
302 2 1
5 5050 302 1 1
7 5095k 302 1 1
9 6p 302 8 1
304 8 7
12 708g 302 1 1

--- PySpark ---¶

Для экономии времени в этом месте вызовем метод .to_spark к полученному выше датафрейму pds_df_8_9. Он позволит получить из pyspark.pandas.frame.DataFrame объект pyspark.sql.dataframe.DataFrame без доп. преобразований.

In [45]:
s_df_8_9 = pds_df_8_9.to_spark(index_col=['dev_key', 'device', 'serv_answer_1'])
s_df_8_9.show()
                                                                                
+-------+--------+-------------+--------------------+------------------+
|dev_key|  device|serv_answer_1|not200_sum_by_device|not200_answers_qty|
+-------+--------+-------------+--------------------+------------------+
|      1|   2pyb2|          302|                   2|                 2|
|      2|   4035d|          302|                   8|                 8|
|      3|   5047d|          302|                   2|                 2|
|      4|   5049w|          301|                   2|                 1|
|      4|   5049w|          302|                   2|                 1|
|      5|    5050|          302|                   1|                 1|
|      7|   5095k|          302|                   1|                 1|
|      9|      6p|          302|                   8|                 1|
|      9|      6p|          304|                   8|                 7|
|     12|    708g|          302|                   1|                 1|
|     17|   9002x|          302|                   1|                 1|
|     19|   9007a|          302|                   2|                 2|
|     20|    a 3g|          499|                   1|                 1|
|     21|   a0001|          304|                   2|                 1|
|     21|   a0001|          499|                   2|                 1|
|     23|  a1-713|          302|                   2|                 1|
|     23|  a1-713|          404|                   2|                 1|
|     24|a1-713hd|          301|                  32|                 2|
|     24|a1-713hd|          302|                  32|                17|
|     24|a1-713hd|          304|                  32|                11|
+-------+--------+-------------+--------------------+------------------+
only showing top 20 rows


По-прежнему в целях унификации можно объединить витрины по пунктам 1-6, датафреймы п.7 и п.8-9 получились разнородными, их имеет смысл рассматривать отдельно.

In [46]:
# объединим витрины п.1-4 и 5-6
s_df_1_6 = (
    s_df_1_4
    .join(s_df_5_6, how='left', on=['dev_key', 'device'])
    .toDF('dev_key', 'device', 'users_qty', 'users_ratio', 'actions_qty', 'actions_ratio_by_device')
)
print('Витрина данных по п.1-6 (фрагмент)')
s_df_1_6.show(10)
Витрина данных по п.1-6 (фрагмент)
[Stage 183:>                                                        (0 + 1) / 1]
+-------+------+---------+-----------+-----------+-----------------------+
|dev_key|device|users_qty|users_ratio|actions_qty|actions_ratio_by_device|
+-------+------+---------+-----------+-----------+-----------------------+
|      0| 2pq93|        1|        0.0|          1|               7.771E-7|
|      1| 2pyb2|        1|        0.0|        139|             1.08016E-4|
|      2| 4035d|        2|     1.0E-5|         26|             2.02044E-5|
|      3| 5047d|        6|     2.0E-5|         83|             6.44988E-5|
|      4| 5049w|        1|        0.0|        137|            1.064618E-4|
|      5|  5050|        4|     1.0E-5|         43|              3.3415E-5|
|      6| 5050x|        1|        0.0|          3|              2.3313E-6|
|      7| 5095k|        1|        0.0|         18|             1.39877E-5|
|      8| 6043d|        4|     1.0E-5|         58|             4.50714E-5|
|      9|    6p|       16|     6.0E-5|        145|            1.126786E-4|
+-------+------+---------+-----------+-----------+-----------------------+
only showing top 10 rows

                                                                                
In [47]:
# отобразим витрину по п.7
print('Витрина данных по п.7 (фрагмент)')
s_df_7.show(10)

# отобразим витрину по п.8-9
print('Витрина данных по п.8-9 (фрагмент)')
s_df_8_9.show(10)
Витрина данных по п.7 (фрагмент)
                                                                                
+-------+------+---------+-----+--------------------+-------------------+
|dev_key|device|  browser|count|     ratio_by_device|        ratio_total|
+-------+------+---------+-----+--------------------+-------------------+
|      0| 2pq93|   Chrome|    1|                 1.0|0.48032311322053495|
|      1| 2pyb2|   Chrome|  160|   0.975609756097561|0.48032311322053495|
|      1| 2pyb2|parse_bot|    4|0.024390243902439025|0.15272705034654813|
|      2| 4035d|   Chrome|   39|                 1.0|0.48032311322053495|
|      3| 5047d|   Chrome|   84|                0.84|0.48032311322053495|
|      3| 5047d|parse_bot|   16|                0.16|0.15272705034654813|
|      4| 5049w|   Chrome|  149|                 1.0|0.48032311322053495|
|      5|  5050|   Chrome|   40|  0.9302325581395349|0.48032311322053495|
|      5|  5050|parse_bot|    3| 0.06976744186046512|0.15272705034654813|
|      6| 5050x|parse_bot|    3|                 1.0|0.15272705034654813|
+-------+------+---------+-----+--------------------+-------------------+
only showing top 10 rows

Витрина данных по п.8-9 (фрагмент)
                                                                                
+-------+------+-------------+--------------------+------------------+
|dev_key|device|serv_answer_1|not200_sum_by_device|not200_answers_qty|
+-------+------+-------------+--------------------+------------------+
|      1| 2pyb2|          302|                   2|                 2|
|      2| 4035d|          302|                   8|                 8|
|      3| 5047d|          302|                   2|                 2|
|      4| 5049w|          301|                   2|                 1|
|      4| 5049w|          302|                   2|                 1|
|      5|  5050|          302|                   1|                 1|
|      7| 5095k|          302|                   1|                 1|
|      9|    6p|          302|                   8|                 1|
|      9|    6p|          304|                   8|                 7|
|     12|  708g|          302|                   1|                 1|
+-------+------+-------------+--------------------+------------------+
only showing top 10 rows


image-2.png

6. Load.¶

Результат всех способов преобразования получается одинаковый, в этом смысле разницы, какой из них выбирать - нет. Попробуем на данном спектре вариантов решения разобраться с трансляцией полученного результата в базы данных, а также с записью и хранением в Hive и HDFS.

Работа с базами данных.¶

  • чтение из PostgreSQL

    Чтение из базы данных с помощью Spark представляется простым, т.к. в метод .read передается минимум параметров.

# последовательность вызовов на чтение из БД
(
    spark
    .read
    .format('jdbc')
    .option('url', 'jdbc:postgresql://146.120.224.166:5432/morozov_evgeny_db')
    .option('dbtable', 'db')
    .option('user', 'morozov_evgeny')
    .option('password', 'w*******')
    .load()
)

Код в работоспособное состояние привести не удалось, последовательность выше (как впрочем и ниже при попытке записи в БД) приводила к ошибкам.

  • запись в PostgreSQL

    Запись в базу данных с помощью Spark также не представляется сложной, в метод .write передаются соотв. аргументы для подключения и записи, нужно только учитывать, что метод .write.format('jdbc')... у датафреймов pandas API отсутствует и существует только для датафреймов pyspark.sql.dataframe.DataFrame.

В данной работе чтение и запись в итоге произведены не были, поскольку на сервере либо не были установлены драйверов БД, либо их местоположение осталось мне неизвестно.

#  последовательность вызовов для записи в ДБ
(
    s_df_1_6
    #.repartition(3)   # просто для напомнинаия о партиционировании при записи в HDFS (к БД не имеет отношения)
    .write
    .format('jdbc')
    .mode('overwrite')
    .option('url', 'jdbc:postgresql://localhost:5432/morozov_evgeny_db')
    #.option('url', 'jdbc:postgresql://146.120.224.166:5432/morozov_evgeny_db')
    .option('dbtable', 'case_1_6')
    .option('user', 'morozov_evgeny')
    .option('password', 'w*******')
    .save()
)
  • запись и чтение в реляционное хранилище Hive

    Запись в БД Hive не требует установки драйверов и похожа на запись в простой файл, с той разницей, что вместо метода .save() вызывается метод .saveAsTable().

# команды отрабатывали только с методом .save() на конце, но файл "улетал" в неизвестном направлении, видимо.
# в HUE найден не был, сам HUE тоже был доступен не каждый раз. Поэтому последовательность команд перенес в 
# markdown-ячейку.
(
    s_df_1_4
    .repartition(3)   # партиционирование при записи в HDFS
    .write
    .format('orc')
    #.partitionBy('device')
    .mode('overwrite')
    #.option('compression','gzip')
    .save('hdfs://localhost:9099/user/morozov_evgeny/')
)
In [51]:
! hdfs dfs -ls -R hdfs://localhost:9099/user/morozov_evgeny/
/bin/bash: hdfs: command not found

Изначально казалось, что в данной сессии есть возможность писать в HDFS, но по всей видимости работа в 'local'-режиме не предусматривает такой возможности...

Отрицательный опыт - тоже опыт, удалять эту часть работы не станем, поскольку есть определённый потенциал к её решению и он может быть развит. Поскольку начинания успехом не увенчались, то используем метод, зарекомендовавший себя как более отлаженный.


Ещё ранее в папке с запущенным проектом Spark создал директорию ./spark-warehouse для хранения метаданных таблиц, в ней же Hive будет по умолчанию хранить созданные таблицы.

Запишем получившиеся витрины и продублируем итоги в папку "output_data", поскольку способы выше не дали желаемых результатов, такой способ также можно считать завершением этапа LOAD и выполнением работы.

In [52]:
#  вручную пропишем последовательность вызовов для записи в Hive
(
    s_df_1_6
    .repartition(3)   # оставил для напомнинаия о партиционировании при записи
    .write
    .format('parquet')
    #.partitionBy('device')
    .mode('overwrite')
    #.option('compression','gzip')
    .saveAsTable('case_1_6_parquet')
)
(
    s_df_7
    .repartition(3)
    .write
    .format('parquet')
    #.partitionBy('device')
    .mode('overwrite')
    #.option('compression','gzip')
    .saveAsTable('case_7_parquet')
)
(
    s_df_8_9
    .repartition(3)
    .write
    .format('parquet')
    #.partitionBy('device')
    .mode('overwrite')
    #.option('compression','gzip')
    .saveAsTable('case_8_9_parquet')
)
                                                                                

Заглянем в директорию spark-warehouse, там можно найти созданные папки вида case_*_parquet с партиционированными файлами витрин и файлом-флагом _SUCCESS, которые говорят об успешности записи бинарных файлов.

In [53]:
! ls -lh spark-warehouse/
total 12K
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  9 00:46 case_1_6_parquet
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  9 00:46 case_7_parquet
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  9 00:46 case_8_9_parquet

Для открытия файла можно использовать более быстрый (чем .read.format().option().load()) метод spark.sql() с соотв. запросом к указанной таблице.

In [55]:
# в языке Hive_QL есть команды для отображения таблиц БД 
spark.sql('show tables').show()
+---------+----------------+-----------+
|namespace|       tableName|isTemporary|
+---------+----------------+-----------+
|  default|case_1_6_parquet|      false|
|  default|  case_7_parquet|      false|
|  default|case_8_9_parquet|      false|
|         |             1_4|       true|
+---------+----------------+-----------+

In [56]:
# обратимся к Hive для загрузки
hivedf = spark.sql('SELECT * FROM case_1_6_parquet')
hivedf.show(10)
hivedf.printSchema()
print('Количество строк case_1_6_parquet =', hivedf.count())
+-------+--------------------+---------+-----------+-----------+-----------------------+
|dev_key|              device|users_qty|users_ratio|actions_qty|actions_ratio_by_device|
+-------+--------------------+---------+-----------+-----------+-----------------------+
|   2181|               s4700|       24|     9.0E-5|        153|            1.188953E-4|
|   3113|           sph-l720t|        2|     1.0E-5|         69|             5.36195E-5|
|   1887|               pc704|        2|     1.0E-5|         24|             1.86503E-5|
|   3249|             vsun h3|        1|        0.0|         83|             6.44988E-5|
|   3188|             trt-lx1|        1|        0.0|          1|               7.771E-7|
|   2074|redmi note 4 miui...|        1|        0.0|          5|              3.8855E-6|
|   2583|              selfie|       19|     7.0E-5|         93|             7.22697E-5|
|    560|            gt-s5670|        4|     1.0E-5|         13|             1.01022E-5|
|   2221|    samsung gt-s7270|        1|        0.0|         12|              9.3251E-6|
|   2999|           sm-t116nu|        8|     3.0E-5|        148|            1.150099E-4|
+-------+--------------------+---------+-----------+-----------+-----------------------+
only showing top 10 rows

root
 |-- dev_key: long (nullable = true)
 |-- device: string (nullable = true)
 |-- users_qty: long (nullable = true)
 |-- users_ratio: double (nullable = true)
 |-- actions_qty: long (nullable = true)
 |-- actions_ratio_by_device: double (nullable = true)

Количество строк case_1_6_parquet = 3381

7. Вывод.¶

In [57]:
# в качестве иллюстрации
print('Работа завершена, останавливаем сессию.')
spark.stop()
Работа завершена, останавливаем сессию.
In [58]:
print('Полученные витрины данных и датафрейм с распарсенными логами в директории',
      '\033[1m' + 'output_data' + '\033[0m:')
! ls -lh ./output_data/
Полученные витрины данных и датафрейм с распарсенными логами в директории output_data:
total 112M
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  8 01:42 case_1_6_parquet
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  8 01:42 case_7_parquet
drwxr-xr-x 2 jupyter-morozov_evgeny jupyter-morozov_evgeny 4.0K Jan  8 01:42 case_8_9_parquet
-rw-r--r-- 1 jupyter-morozov_evgeny jupyter-morozov_evgeny 112M Jan  9 00:42 parsed_logs_df.parquet
  • Была предпринята попытка реализации логики обработки данных, условно принятых за "большие", последовательное формулирование целей работы и подбор инструментов для их реализаци заняли значительное время, но результат всё-таки был получен.

  • Таким образом была отработана возможность переноса методов Pandas в мир Spark и сделано их сопоставление как представителей разных "парадигм" вычислений: мира последовательных (хоть и в немалой степени оптимизированных) вычислений и распределенных/отложеных вычислений.Проведено натурное исследование взаимодействия программных платформ для обработки "больших данных".

  • В настоящее время, с включением Pandas API в состав low-level API Spark, количество способов обработки только возросло, но взаимодействие между библиотеками оказалось удивиельно "гибким" и "прозрачным". На скорости исполнения кода такие нововведения практически не сказываются, на мой взгляд, т.к. все оболочки (от Scala-shell и Python-shell до API SQL и API Pandas) в конечном счёте вызывают одни и теже методы движка Spark для работы с RDD, предварительно подвергаясь оптимизациям, что должно практически уравнивать методы.

Примененный способ показался довольно лаконичным (если не рассматривать исследовательскую составляющую), вариативным и включающим большую часть привитых в курсе навыков. Полученный опыт позволит применять навыки в трудовой деятельности без затруднений и, надеюсь, в скором времени.

image.png